// Frequent (>1000) failure cause for different categories of events grouped by duration (short, medium, long)

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include "mpi.h"
#include "hashmap_public.h"

#define LINEMAX 512 
#define NAMEMAX 32 
#define CHUNK 20480
#define ENCODED_NO 33
#define MAX_UNITS 10

#define JOB_TAG 0 
#define RESPONSE_TAG 1
#define QUERY_TAG 2

typedef struct location_unit {
    char location[NAMEMAX]; 
    unsigned long failures; 
} location_unit; 

typedef struct resp_payload {
    int unsigned count; 
    location_unit units[MAX_UNITS]; 
} resp_payload; 

typedef struct node_key {
    int unsigned node_id; 
    int unsigned platform_id; 
} node_key; 

typedef struct query_payload {
    node_key key; 
} query_payload; 

typedef struct node_value {
    char location[0]; 
} node_value; 

uint32_t node_hash(key generic_key) {
    node_key *k = (node_key *) generic_key; 
    return (k->node_id * k->platform_id); 
}

bool node_eq(key a, key b) {
    node_key *na = (node_key *)a; 
    node_key *nb = (node_key *)b; 
    return (na->node_id == nb->node_id) && (na->platform_id == nb->platform_id); 
}

typedef struct loc_key {
    char *name; 
} loc_key; 

typedef struct loc_value {
    int unsigned failures; 
} loc_value; 

uint32_t loc_hash(key generic_key) {
    loc_key *k = (loc_key *) generic_key; 
    return (uint32_t)k->name[0]; 
}

bool loc_eq(key a, key b) {
    loc_key *la = (loc_key *) a; 
    loc_key *lb = (loc_key *) b; 

    return strcmp(la->name, lb->name) == 0; 
}


/* for master */
hashmap *nodes_map;

/* for slaves: */
hashmap *locations; 
hashmap *nodes_cache; 
 
unsigned long* poz;
unsigned long recv[2];
unsigned long poz_size = 0;
unsigned long recv_m[ENCODED_NO];
unsigned long send_m[ENCODED_NO];

int failure_key[] = {-1, 0, 999, 1999, 2999, 3999, 4999, 5999, 6999, 7000, 7001};
char* failure_value[] = {"not reported", "reported as undetermined", "infrastructure", "hardware", "IO", "network", "software", "human error",
			 "user", "end of measurement", "TYPING"};
long duration_key[] = {1000, 100000, 2147483647};
char* duration_value[] = {"short", "medium", "long"};
char* encoded_values[] = {"short-not reported", "medium-not reported", "long-not reported", "short-reported as undetermined",
			  "medium-reported as undetermined", "long-reported as undetermined", "short-infrastructure", "medium-infrastructure",
			  "long-infrastructure", "short-hardware", "medium-hardware", "long-hardware", "short-IO", "medium-IO", "long-IO", "short-network",
			  "medium-network", "long-network", "short-software", "medium-software", "long-software", "short-human error",
			  "medium-human error", "long-human error", "short-user", "medium-user", "long-user", "short-end of measurement",
			  "medium-end of measurement", "long-end of measurement", "short-TYPING", "medium-TYPING", "long-TYPING"};

unsigned long file_length(FILE *f) {
    long pos;
    long end;

    pos = ftell (f);
    fseek (f, 0, SEEK_END);
    end = ftell (f);
    fseek (f, pos, SEEK_SET);

    return end;
}


void write_result (char *filename) {
    FILE *f;
    int i;

    f = fopen(filename, "wt");
    if (f == NULL) {
	fprintf(stderr, "Could not open file %s for write\n", filename);
	MPI_Abort(MPI_COMM_WORLD, 7777);
    }

    for(i = 0 ; i < ENCODED_NO ; i++) {
	fprintf(f, "%-40s", encoded_values[i]);
	fprintf(f, "%lu\n", recv_m[i]);
    }
}

void load_nodes_info(char *nodes_filename) {
    FILE *nodes_file;
    char buffer[LINEMAX]; 
    char node_name[NAMEMAX], node_ip[NAMEMAX], node_location[NAMEMAX]; 
    node_key *node; 
    node_value *value; 

    /* initialize hashmap */
    nodes_map = mk_hmap(node_hash, node_eq); 

    nodes_file = fopen(nodes_filename, "rt"); 
    if (nodes_file == NULL) { 
	fprintf(stderr, "Could not open nodes file %s \n", nodes_filename); 
	MPI_Abort(MPI_COMM_WORLD, 7777);
    }

    while (fgets(buffer, LINEMAX, nodes_file) != NULL) { 
	/* skip comments */
	if (buffer[0] == '#') 
	    continue; 
	
	node = malloc(sizeof(node_key)); 
	if (node == NULL) {
	    fprintf(stderr, "Not enough memory \n"); 
	    MPI_Abort(MPI_COMM_WORLD, 7777); 
	}
	
	sscanf(buffer, "%u %u %s %s %s", &node->node_id, &node->platform_id, node_name, node_ip, node_location);
	value = malloc(sizeof(node_value) + strlen(node_location) + 1); 
	if (value == NULL) { 
	    fprintf(stderr, "Not enough memory \n"); 
	    MPI_Abort(MPI_COMM_WORLD, 7777); 
	}

	strcpy(value->location, node_location); 
	if (hmap_add(nodes_map, node, value) == false) {
	    fprintf(stderr, "Hashmap out of memory \n"); 
	    MPI_Abort(MPI_COMM_WORLD, 7777); 
	} 
    }

    fclose(nodes_file); 

    /* 
    node_key k;
    k.node_id = 160; 
    k.platform_id = 3; 
    value = hmap_get(nodes_map, &k); 
    if (value == NULL) {
	printf("Error \n"); 
    } else {
	printf("Search result: %s \n", value->location); 
    }
    */
}

int init(char *filename) {
    unsigned long file_len = 0;
    unsigned long i, j;
    FILE *f;

    f = fopen(filename, "rt");
    if (f == NULL) {
	fprintf(stderr, "Could not open file init.\n");
	MPI_Abort(MPI_COMM_WORLD, 7777);
    }
    file_len = file_length(f);
    poz_size = file_len / CHUNK + 1;

    poz = (unsigned long*) malloc(poz_size * sizeof(unsigned long));
    poz[0] = 0;
    for (i = CHUNK, j = 1; j < poz_size - 1; i += CHUNK, j++) {
	poz[j] = i;
    }
    poz[j] = file_len;

    return 1;
}

int main(int argc, char** argv) {

    int rank, size;
    MPI_Status status;
    MPI_Request req;
    int i, flag;
    void *buffer = malloc(sizeof(resp_payload));
    resp_payload *resp; 
    query_payload *query; 

    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &size);

    if (rank == 0) {
	// master inits
	init(argv[1]);
	load_nodes_info(argv[2]); 
    }

    MPI_Barrier(MPI_COMM_WORLD);

    // master sends indexes, receives results
    if(rank == 0) {

	char null_location = 0;
	for(i = 0 ; i < ENCODED_NO ; i++) {
	    recv_m[i] = 0;
	    send_m[i] = 0;
	}

	// sends indexes
	for (i = 0; i < poz_size - 1; i++) {
	    MPI_Isend(poz+i, 2, MPI_UNSIGNED_LONG, i%(size-1)+1, JOB_TAG, MPI_COMM_WORLD, &req);
	}

	// receive results
	unsigned long recv_temp[ENCODED_NO];
	while(1) {
	    
	    for(i = 0 ; i < ENCODED_NO ; i++)
		recv_temp[i] = 0;
	    
	    MPI_Irecv(buffer, ENCODED_NO, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &req);
	    
	    MPI_Test(&req, &flag, &status);
	    int times = 0;
	    while(!flag && times < 10) {
		times++;
		sleep(1);
		MPI_Test(&req, &flag, &status);
	    }
	    if(!flag)
		break;
	    
	    if (status.MPI_TAG == RESPONSE_TAG) { 
		for(i = 0 ; i < ENCODED_NO ; i++) {
		    recv_m[i] += recv_temp[i];
		}
		printf("Response received \n"); 

	    } else if (status.MPI_TAG == QUERY_TAG) { 
		query = (query_payload *)buffer;
		node_value *value = hmap_get(nodes_map, query); 
		if (value != NULL) { 
		    MPI_Send(value->location, NAMEMAX, MPI_CHAR, status.MPI_SOURCE, QUERY_TAG, MPI_COMM_WORLD); 
		} else {
		    MPI_Send(&null_location, 1, MPI_CHAR, status.MPI_SOURCE, QUERY_TAG, MPI_COMM_WORLD); 
		}
	    }
	}

	// write results
	write_result(argv[3]);
	free(poz);

	free_hmap(nodes_map); 
    }

    // slaves receive indexes, do computation
    else {

	locations = mk_hmap(loc_hash, loc_eq); 
	nodes_cache = mk_hmap(node_hash, node_eq); 
	while(1) {
	    
	    // receive until there's nothing left
	    MPI_Irecv(recv, 2, MPI_UNSIGNED_LONG, 0, JOB_TAG, MPI_COMM_WORLD, &req);
	    
	    MPI_Test(&req, &flag, &status);
	    int times = 0;
	    while(!flag && times < 10) {
		times++;
		sleep(1);
		MPI_Test(&req, &flag, &status);
	    }
	    if(!flag)
		break;

	    // read file and do computation
	    FILE *f;

	    f = fopen(argv[1], "rt");
	    if (f == NULL) {
		fprintf(stderr, "Could not open file.\n");
		MPI_Abort(MPI_COMM_WORLD, 7777);
	    }
	    
	    fseek (f, recv[0], SEEK_SET);

	    // go to the end of the line
	    char * line = NULL;
	    size_t len = 0;
	    ssize_t read;
	    unsigned long count = 0;

	    for(i = 0 ; i < ENCODED_NO ; i++) {
		send_m[i] = 0;
		recv_m[i] = 0;
	    }

	    while ((read = getline(&line, &len, f)) != -1) {

		// skip the comment line and the half lines
		count++;
		if(count == 1)
		    continue;

		// compute
		//char response;

		node_key node; 
		char delims[] = " ";
		char *result = NULL;
		result = strtok( line, delims );
		int token_no = 0;
		while( result != NULL ) {
		    token_no++;
		    if(token_no == 2) {
			node.node_id = atoi(result);
		    }
		    if(token_no == 3) {
			node.platform_id = atof(result);
		    }
		    result = strtok( NULL, delims );
		}
		
		node_value *value = hmap_get(nodes_cache, &node); 
		
		if (value == NULL) { 
		    char location[NAMEMAX]; 
		    MPI_Send(&node, sizeof(node_key), MPI_CHAR, 0, QUERY_TAG, MPI_COMM_WORLD); 
		    MPI_Recv(location, NAMEMAX, MPI_CHAR, 0, QUERY_TAG, MPI_COMM_WORLD, &status); 
		    value = malloc(sizeof(node_value) * strlen(location) + 1); 
		    node_key *new_node = malloc(sizeof(node_key)); 
		    new_node->node_id = node.node_id; 
		    new_node->platform_id = node.platform_id; 
		    strcpy(value->location, location); 
		    hmap_add(nodes_cache, new_node, value); 
		    if (strlen(location) > 0) { 
			/* add to the locations set */
			loc_key *loc = malloc(sizeof(loc_key)); 
			loc->name = value->location; 
			loc_value *locval = malloc(sizeof(loc_value)); 
			locval->failures = 1; 
			hmap_add(locations, loc, locval); 
			printf("Location is %s \n", location); 
		    }
		} else {
		    printf("Found in local cache %s \n", value->location); 
		    if (strlen(value->location) > 0) { 
			loc_key *loc = malloc(sizeof(loc_key)); 
			loc->name = value->location; 
			loc_value *loc_data = hmap_get(locations, loc); 
			if (loc_data != NULL) {
			    loc_data->failures++; 
			}
		    }
		}
		// finished reading the chunk?
		if(ftell(f) > recv[1])
		    break;
		
	    }

	    // send result to master
	    MPI_Isend(send_m, ENCODED_NO, MPI_UNSIGNED_LONG, 0, RESPONSE_TAG, MPI_COMM_WORLD, &req);
	}
    }

    // block and finish
    MPI_Barrier(MPI_COMM_WORLD);

    
    MPI_Finalize();
    return 0; 
    } 

